e7240e207db697fd0f82de28fb95351acb8e5daa,spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java,ConcurrentMessageListenerContainerTests,testAutoCommitWithRebalanceListener,#,136

Before Change


				new ConcurrentMessageListenerContainer<>(cf, containerProps);
		final CountDownLatch latch = new CountDownLatch(4);
		final List<String> listenerThreadNames = new ArrayList<>();
		containerProps.setMessageListener(new MessageListener<Integer, String>() {

			@Override
			public void onMessage(ConsumerRecord<Integer, String> message) {
				ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
				listenerThreadNames.add(Thread.currentThread().getName());
				latch.countDown();
			}
		});
		final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
		final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
		containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {

After Change


				new ConcurrentMessageListenerContainer<>(cf, containerProps);
		final CountDownLatch latch = new CountDownLatch(4);
		final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
		containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
			ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
			listenerThreadNames.add(Thread.currentThread().getName());
			latch.countDown();
		});
		final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
		final CountDownLatch rebalancePartitionsRevokedLatch = new CountDownLatch(2);
		containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {